package com.tl.spark.scala

import java.io.File

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @program: spark-test
  * @description:
  * @author: dong.tl
  * @create: 2018-09-19 18:09
  **/
object ScalaFilePutHbase {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaFilePutHbase").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)

    val conf = createHBaseConfiguration()

    val job = createJob(conf)

    val files = subdirs3(new File("F:\\data\\dome"))

    val tuples = files.map(file => {
      val rowkey = file.getParentFile.getName + "|" + file.getName
      val put = new Put(Bytes.toBytes(rowkey))
      put.addColumn(Bytes.toBytes("file_info"), Bytes.toBytes("path"), Bytes.toBytes(file.getParentFile.toString))
      put.addColumn(Bytes.toBytes("file_info"), Bytes.toBytes("name"), Bytes.toBytes(file.getName))
      put.addColumn(Bytes.toBytes("file_info"), Bytes.toBytes("size"), Bytes.toBytes(file.length))
      put.addColumn(Bytes.toBytes("file_info"), Bytes.toBytes("type"), Bytes.toBytes(file.getName.substring(file.getName.lastIndexOf(".") + 1)))
      put.addColumn(Bytes.toBytes("file_info"), Bytes.toBytes("time"), Bytes.toBytes(LocalDateTime.ofInstant(Instant.ofEpochMilli(file.lastModified), ZoneId.of("Asia/Shanghai")).toString))
      put.addColumn(Bytes.toBytes("file_info"), Bytes.toBytes("bytes"), Files.readAllBytes(Paths.get(file.getAbsolutePath)))

      (new ImmutableBytesWritable, put)
    }).toSeq

    sc.parallelize(tuples).saveAsNewAPIHadoopDataset(job.getConfiguration)

    sc.stop()
  }

  def createJob(conf: Configuration): Job = {
    val job = Job.getInstance(conf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job
  }

  def createHBaseConfiguration(): Configuration = {
    //Hbase设置
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址，也可以通过将hbase-site.xml导入classpath，但是建议在程序里这样设置
    conf.set(HConstants.ZOOKEEPER_QUORUM, "db01,db02,db03")
    //设置zookeeper连接端口，默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    //设置写入的表
    conf.set(TableOutputFormat.OUTPUT_TABLE, "file_data")
    conf
  }


  def subdirs3(dir: File): Iterator[File] = {
    val files = dir.listFiles()
    val d = files.filter(_.isDirectory)
    val f = files.filter(!_.isDirectory).toIterator
    f ++ d.toIterator.flatMap(subdirs3 _)
  }

}
